package org.xqh.study.mq.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Properties;

/**
 * @ClassName KafkaConsumerTest
 * @Description kafka消费者
 * @Author xuqianghui
 * @Date 2020/9/22 16:37
 * @Version 1.0
 */
@Slf4j
public class KafkaConsumerTest {

    private static KafkaConsumer<String,String> consumer;

    /**
     *  初始化消费者
     */
    static {
        Properties configs = initConfig();
        consumer = new KafkaConsumer<String, String>(configs);
        consumer.subscribe(Arrays.asList(KafkaDict.CONSUMER_TOPIC));
    }
    /**
     *  初始化配置
     */
    private static Properties initConfig(){
        Properties props = new Properties();
        props.put("bootstrap.servers", KafkaDict.MQ_ADDRESS_COLLECTION);
        props.put("group.id", KafkaDict.CONSUMER_GROUP_ID);
        props.put("enable.auto.commit", KafkaDict.CONSUMER_ENABLE_AUTO_COMMIT);
        props.put("auto.commit.interval.ms", KafkaDict.CONSUMER_AUTO_COMMIT_INTERVAL_MS);
        props.put("session.timeout.ms", KafkaDict.CONSUMER_SESSION_TIMEOUT_MS);
        props.put("max.poll.records", KafkaDict.CONSUMER_MAX_POLL_RECORDS);
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        //TODO kafka sasl 鉴权配置
//        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
//        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
//        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"kafka\" password=\"iSPmfJmtBgcvcTJZ\";");

        return props;
    }
    public static void main(String[] args) {

        while (true) {

            ConsumerRecords<String, String> records = consumer.poll(KafkaDict.CONSUMER_POLL_TIME_OUT);
            records.forEach((ConsumerRecord<String, String> record)->{
                log.info("revice: key ==="+record.key()+" value ===="+record.value()+" topic ==="+record.topic());
            });
        }
    }
}
